Dagsterで開発したデータパイプラインの処理をテストする
大阪オフィスの玉井です。
データオーケストレーションツールのDagsterは、開発したデータパイプライン(の処理)に対して、テストコードを書くことができます。
Dagsterにおけるテスト
データパイプラインを開発して、それを統率することができるDagsterですが、1つ1つの処理に対して「生成されるデータが要件通りかどうか」という観点でテストが書けるのがDagsterの特徴となっています。
これを利用して、SolidやPipeline毎に単体テストを実装することで、生成されるデータの品質を担保することができます。
やってみた
参考
こちらのチュートリアルは、Solidと同じファイルにテストコードも書いていますが、当記事では、テストファイルは分けてやってみます。
検証環境
- macOS Catalina 10.15.7
- Python 3.9.4
- pytest 6.2.3
- Dagster 0.10.9
テスト対象のSolidとPipeline
まず、テストするSolidとPipelineを用意します。…といっても、こちらで使ったものをそのまま流用します。
import csv import os from dagster import execute_pipeline, pipeline, solid @solid def hello_cereal(context): # データセットがこのファイルと同じディレクトリにあると仮定 dataset_path = os.path.join(os.path.dirname(__file__), "cereal.csv") with open(dataset_path, "r") as fd: # 標準のcsvライブラリを使用して行を読み込む cereals = [row for row in csv.DictReader(fd)] context.log.info( "シリアルに関するデータは{n_cereals}件あります。".format(n_cereals=len(cereals)) ) return cereals @pipeline def hello_cereal_pipeline(): hello_cereal()
テストファイルを作成する
それでは早速テストを書いていきます。
Dagsterにおけるテストというと、何か特別な機能とかを使うような気がしますが、今回の処理内容はPythonで書かれているので、実際に記述するのは、いたって普通のPythonのassert文です。そして、テストの実行はpytest
を使います。ただし、テストを書くのに便利なDagster用のライブラリがあるので、それを利用します。
import csv import os import hello_cereal_ta from dagster import execute_solid, execute_pipeline, pipeline, solid def test_hello_cereal_solid(): res = execute_solid(hello_cereal_ta.hello_cereal) # 処理が成功したかどうか assert res.success # 読み込んだファイルのレコード数が77件かどうか assert len(res.output_value()) == 77 def test_hello_cereal_pipeline(): # やってることはsolidのテストと同じ res = execute_pipeline(hello_cereal_ta.hello_cereal_pipeline) assert res.success assert len(res.result_for_solid("hello_cereal").output_value()) == 77
execute_solid
とexecute_pipeline
というものを使ってテストを書いています。これを使うことで、SolidやPipelineの実行結果(のオブジェクト)を取得することができます。このオブジェクトに対して色々な処理がかけられるので、非常に簡単にテストを書くことが出来ます。
テストする
テストコードを実際に実行してみます。pytest
でサクッとテストしてみましょう。
$ pytest test_hello_cereal.py ================================================================== test session starts =================================================================== platform darwin -- Python 3.9.4, pytest-6.2.3, py-1.10.0, pluggy-0.13.1 rootdir: xxxxxxxxx/test_dagster_pipeline collected 2 items test_hello_cereal.py .. [100%] =================================================================== 2 passed in 1.74s ====================================================================
無事、テストが実行され、テスト自体も通過しました。
おわりに
今回は各処理単位のテスト、要するに単体テストレベルのものをやってみました。各処理に対するテストは、今回の方法で実行することができると思います。しかし、(当然のことながら)テストは単体テストだけではありません。
Dagsterにはユニットテストを行う仕組みもあります。そちらの方も試して、どこかのタイミングで記事にしたいと思います。